package com.xiaofan.java;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * the Batch SQL API is used in Java
 */
public class WordCountSQL_E0002 {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

        DataSet<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("Hi", 1),
                new WC("Hello", 1)
        );

        tableEnv.createTemporaryView("wordcount", input, $("word"), $("frequency"));

        Table table = tableEnv.sqlQuery("select word, sum(frequency) as frequency from wordcount group by word");

        Table filterTable = table.filter($("frequency").isEqual(2));

        DataSet<WC> result = tableEnv.toDataSet(filterTable, WC.class);

        result.print();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WC {
        public String word;
        public long frequency;

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }
}
